{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.conf import SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "import pandas as pd\n",
    "\n",
    "# This CATALOG_URL works for the \"docker compose\" testing and development environment\n",
    "# Change 'lakekeeper' if you are not running on \"docker compose\" (f. ex. 'localhost' if Lakekeeper is running locally).\n",
    "CATALOG_URL = \"http://lakekeeper:8181/catalog\"\n",
    "WAREHOUSE = \"demo\"\n",
    "\n",
    "SPARK_VERSION = pyspark.__version__\n",
    "SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])\n",
    "ICEBERG_VERSION = \"1.6.1\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Connect with Spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "config = {\n",
    "    f\"spark.sql.catalog.lakekeeper\": \"org.apache.iceberg.spark.SparkCatalog\",\n",
    "    f\"spark.sql.catalog.lakekeeper.type\": \"rest\",\n",
    "    f\"spark.sql.catalog.lakekeeper.uri\": CATALOG_URL,\n",
    "    f\"spark.sql.catalog.lakekeeper.warehouse\": WAREHOUSE,\n",
    "    f\"spark.sql.catalog.lakekeeper.io-impl\": \"org.apache.iceberg.aws.s3.S3FileIO\",\n",
    "    \"spark.sql.extensions\": \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\",\n",
    "    \"spark.sql.defaultCatalog\": \"lakekeeper\",\n",
    "    \"spark.jars.packages\": f\"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}\",\n",
    "}\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_config = SparkConf().setMaster('local').setAppName(\"Iceberg-REST\")\n",
    "for k, v in config.items():\n",
    "    spark_config = spark_config.set(k, v)\n",
    "\n",
    "spark = SparkSession.builder.config(conf=spark_config).getOrCreate()\n",
    "\n",
    "spark.sql(\"USE lakekeeper\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read and Write Tables"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(f\"CREATE NAMESPACE IF NOT EXISTS my_namespace\")\n",
    "spark.sql(\"SHOW NAMESPACES\").toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = pd.DataFrame([[1, 'a-string', 2.2]], columns=['id', 'strings', 'floats'])\n",
    "sdf = spark.createDataFrame(data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sdf.writeTo(f\"my_namespace.my_table\").createOrReplace()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(f\"SELECT * FROM my_namespace.my_table\").toPandas()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
